package com.atguigu.utils;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

//泛型指定了自定义数据源中的数据类型
public class ClickSource implements SourceFunction<ClickEvent> {
    private boolean running = true;
    private Random random = new Random();
    private String[] userArr = {"Allen", "James", "Iverson", "KD", "KG","Jordan","Shark"};
    private String[] urlArr = {"./home", "./cart", "./fav", "./product?id=1", "./product?id=2"};



    @Override
    public void run(SourceContext<ClickEvent> ctx) throws Exception {
        while (running){
            //collect发给发发出数据
            ctx.collect(
                    new ClickEvent(
                            userArr[random.nextInt(userArr.length)],
                            urlArr[random.nextInt(urlArr.length)],
                            Calendar.getInstance().getTimeInMillis()
                    )
            );
            Thread.sleep(100L);

        }
    }

    //flink cancel JobId
    @Override
    public void cancel() {
        running = false;

    }
}
